package com.jie.flink.cdc.config;

import com.jie.flink.cdc.doman.FlinkCdcSinkType;
import com.jie.flink.cdc.doman.FlinkCdcSourceType;
import com.jie.flink.cdc.doman.FlinkCdcStreamProperties;
import com.jie.flink.cdc.flinksink.FlinkSinkBuilder;
import com.jie.flink.cdc.flinksink.config.SinkConfigProperties;
import com.jie.flink.cdc.flinksink.config.SinkConfigPropertiesFactory;
import com.jie.flink.cdc.flinksource.config.SourceConfigProperties;
import com.jie.flink.cdc.service.FlinkCdcService;
import com.jie.flink.cdc.util.ColumnUtil;
import com.jie.flink.cdc.util.FileUtil;
import com.jie.flink.cdc.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink2.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * @author zhanggj
 * @date 2023/5/31 17:38
 * @desc
 */
@Slf4j
@Configuration
public class FlinkCdcStreamConfig {

    public static final String FLINK_CDC_SERVICE_LIST_NAME = "flinkCdcServerList";

    private static final String FLINK_STREAM_LIST_SOURCE_KEY = "source";
    private static final String FLINK_STREAM_LIST_SINK_KEY_START = "sink";
    private static final String FLINK_STREAM_LIST_TYPE_KEY = "type";

    @Bean(FLINK_CDC_SERVICE_LIST_NAME)
    public List<FlinkCdcService> buildFlinkCdcServiceList(FlinkCdcStreamConfigProperties streamConfigProperties) {
        final List<FlinkCdcService> serviceList = new ArrayList<>();
        // 从配置文件中获取并配置FlinkCdcService
        final List<FlinkCdcService> propertiesServiceList = buildFromProperties(streamConfigProperties);
        if (CollectionUtils.isNotEmpty(propertiesServiceList)) {
            serviceList.addAll(propertiesServiceList);
        }
        final List<FlinkCdcService> jsonFileServiceList = buildFromJsonFile();
        if (CollectionUtils.isNotEmpty(jsonFileServiceList)) {
            serviceList.addAll(jsonFileServiceList);
        }
        if (CollectionUtils.isEmpty(serviceList)) {
            return null;
        }
        return serviceList;
    }

    private List<FlinkCdcService> buildFromJsonFile() {
        // 读文件
        String streamStr = null;
        try {
            streamStr = FileUtil.readAllBytes("flink-cdc-stream.json");
        } catch (Exception e) {
            log.error("read flink stream json file exception:", e);
        }
        if (StringUtils.isBlank(streamStr)) {
            log.warn("flink stream file read is empty");
            return null;
        }
        final List<Map> streamMapList = JsonUtils.toJSONArray(streamStr, Map.class);
        if (CollectionUtils.isEmpty(streamMapList)) {
            return null;
        }
        final AtomicInteger runOrder = new AtomicInteger(1);
        return streamMapList.stream().map(streamMap -> {
            final FlinkCdcStreamProperties streamProperties = JsonUtils.mapToObject(streamMap, FlinkCdcStreamProperties.class);
            if (Objects.isNull(streamProperties)
                    || StringUtils.isEmpty(streamProperties.getJobName())
                    || Objects.isNull(streamProperties.getSinkConfigList())
                    || Objects.isNull(streamProperties.getSourceConfigProperties())) {
                return null;
            }
            final List<Sink<String>> sinkList = SinkConfigPropertiesFactory.buildSinkList(streamMap);
            final SourceConfigProperties sourceConfig = JsonUtils.mapToObject(
                    (Map) streamMap.get(ColumnUtil.getFieldName(FlinkCdcStreamProperties :: getSourceConfigProperties)),
                    streamProperties.getSourceConfigProperties().getSourceType().getSourceConfigClass());
            final FlinkCdcService flinkCdcService = SourceConfigProperties.sourceMap.get(sourceConfig.getKey());

            flinkCdcService.setSinkList(sinkList);
            flinkCdcService.setRunOrder(runOrder.getAndIncrement());
            flinkCdcService.setJobName(streamProperties.getJobName());
            return flinkCdcService;
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private List<FlinkCdcService> buildFromProperties(final FlinkCdcStreamConfigProperties streamConfigProperties) {
        if (Objects.isNull(streamConfigProperties)) {
            return null;
        }
        final Map<String, Map<String, Map<String, String>>> flinkStreamMap = streamConfigProperties.getFlinkStream();
        if (MapUtils.isEmpty(flinkStreamMap)) {
            return null;
        }
        final List<FlinkCdcService> serviceList = new ArrayList<>();
        flinkStreamMap.forEach((streamKey, streamValue) -> {
            // 处理source配置
            final Map<String, String> sourceMap = streamValue.get(FLINK_STREAM_LIST_SOURCE_KEY);
            if (MapUtils.isEmpty(sourceMap)) {
                log.error("flink stream config {} lost source", streamKey);
                return;
            }
            final FlinkCdcSourceType sourceType = FlinkCdcSourceType.getByCode(sourceMap.get(FLINK_STREAM_LIST_TYPE_KEY));
            final SourceConfigProperties sourceConfig = JsonUtils.stringToObject(JsonUtils.toJSONString(sourceMap), sourceType.getSourceConfigClass());

            // 获取service
            final FlinkCdcService flinkCdcService = SourceConfigProperties.sourceMap.get(sourceConfig.getKey());

            flinkCdcService.setJobName(streamKey);

            // 处理sink配置
            final List<Sink<String>> sinkList = new ArrayList<>();
            final AtomicInteger runOrder = new AtomicInteger(1);
            streamValue.forEach((sinkKey, sinkMap) -> {
                if (FLINK_STREAM_LIST_SOURCE_KEY.equals(sinkKey)
                        || !sinkKey.startsWith(FLINK_STREAM_LIST_SINK_KEY_START)) {
                    return;
                }
                final FlinkCdcSinkType sinkType = FlinkCdcSinkType.getByCode(sinkMap.get(FLINK_STREAM_LIST_TYPE_KEY));
                final SinkConfigProperties sinkConfig = JsonUtils.stringToObject(JsonUtils.toJSONString(sinkMap), sinkType.getSinkConfigClass());
                final FlinkSinkBuilder flinkSinkBuilder = SinkConfigProperties.sinkMap.get(sinkConfig.getKey());
                if (Objects.isNull(flinkSinkBuilder)) {
                    sinkList.add(flinkSinkBuilder.buildSink(sinkConfig));
                }
            });

            if (CollectionUtils.isEmpty(sinkList)) {
                throw new RuntimeException("flink stream 配置错误，缺少sink配置");
            }
            flinkCdcService.setSinkList(sinkList);
            flinkCdcService.setRunOrder(runOrder.getAndIncrement());
            serviceList.add(flinkCdcService);
        });
        return serviceList;
    }
}
